IOT业务,之前用阿里商用rocketMQ,但设备量上来后,很费钱
so自行搞了一个基于redis list的rightPush+leftPop方案的MQ
IOT网关端无脑rightPush,业务端多线程消费

  • 调用方式
    @Bean(destroyMethod = "shutdown")
    @ConditionalOnProperty(name = "redisPopMq.enable",havingValue = "true")
    public RedisPopMqConsumer redisPopMqConsumer(){
        Properties properties = new Properties();
        properties.put("redisListKey",redisListKey);
        properties.put("threadCount",threadCount);
        RedisPopMqConsumer consumer = DefaultRedisPopMqConsumer.createConsumer(properties,stringRedisTemplate,redisPopMqListener);
        consumer.start();
        log.info("RedisPopMqConsumer start");
        return consumer;
    }
  • DefaultRedisPopMqConsumer核心代码如下
public class DefaultRedisPopMqConsumer implements RedisPopMqConsumer {
	private  volatile boolean isStarted = false;
	private  RedisPopMqListener listener;
	private  String redisListKey;
	private  int threadCount;
	private int retryDelaySeconds;
	private int maxRetryTime = 3;
	private StringRedisTemplate redisTemplate;
	private ThreadPoolExecutor consumerExecutor;
	private  int  statisticsIntervalSeconds = 5;
	private final DelayQueue<DelayedMsg> delayQueue = new DelayQueue<>();

	private static final Logger log = Logger.getLogger(DefaultRedisPopMqConsumer.class.toString());

	private DefaultRedisPopMqConsumer() {}

	public static RedisPopMqConsumer createConsumer(Properties properties,StringRedisTemplate stringRedisTemplate, RedisPopMqListener listener){
		RedisPopMqConsumer consumer = new DefaultRedisPopMqConsumer();
		consumer.setProperties(properties);
		consumer.setRedisTemplate(stringRedisTemplate);
		consumer.setListener(listener);
		return consumer;
	}

	@Override
	public boolean isStarted() {
		return isStarted;
	}

	@Override
	public boolean isClosed() {
		return !isStarted;
	}

	public void  start(){
		if(isStarted)
			return;
		consumerExecutor = new ThreadPoolExecutor(threadCount, threadCount
				, 0L, TimeUnit.SECONDS
				, new LinkedBlockingQueue<>(threadCount*3)
				, new ThreadFactoryBuilder().setNameFormat("redisPopMqConsumerPool-%d").build());
		var  mainExecutor = Executors.newFixedThreadPool(2 + (statisticsIntervalSeconds>0?1:0)
			, new ThreadFactoryBuilder().setNameFormat("redisPopMqMainPool-%d").build());
		isStarted = true;
		mainExecutor.execute(()->consumerRedisMsg());
		mainExecutor.execute(()->consumerRetryDelayMsg());
		//一个定时任务线程,实时统计消费情况
		if(statisticsIntervalSeconds > 0)
			mainExecutor.execute(()->printConsumerStatistics());
		log.info("redisPopMq consumer started");
	}

	//打印消费统计信息
	public void printConsumerStatistics() {
		if(statisticsIntervalSeconds == 0)
			return;
		for (; ; ) {
			if (!isStarted) {
				SleepUtil.sleepMs(1);
				continue;
			}
			var statistics = getConsumerStatistics();
			log.info("redisPopMq consumer statistics:"  + statistics);
			SleepUtil.sleepMs(statisticsIntervalSeconds*1000);
		}
	}

	@Override
	public ConsumerStatistics getConsumerStatistics() {
		if (!isStarted)
			return null;
		return new ConsumerStatistics(
				 consumerExecutor.getActiveCount()
				,consumerExecutor.getPoolSize()
				,consumerExecutor.getQueue().size()
				,consumerExecutor.getCompletedTaskCount()
				,new Date());
	}

	//判断线程池是否已满
	private boolean consumerQueueIsFull(){
		return consumerExecutor.getQueue().size() >= threadCount;
	}




	private void consumerRedisMsg() {
		//无消息时延迟及线程池满时延迟获取时间,线程越少,最大延迟时间越长
		int delayMs = 10+128/threadCount;
		//有消息时延迟时间,线程数大于8时为2ms,小于8时为1ms
		int sleepMs = threadCount>16?0:threadCount>8?1:2;
		for(;;){
			if(!isStarted){
				SleepUtil.sleepRandomMs(10);
				continue;
			}
			if(consumerQueueIsFull()){
				//如果线程池已满,则等待20-35毫秒
				SleepUtil.sleepMs(delayMs);
				continue;
			}
			//不延时获取
			var msg = redisTemplate.opsForList().leftPop(redisListKey, 0L, TimeUnit.SECONDS);
			if(msg != null){
				consumerExecutor.execute(() -> {
					try {
						listener.consume(msg);
					} catch (Exception e) {
						log.warning("redisPopMq consume msg error,msg:{}" + msg + "," + e.getMessage());
						//将msg缓存到延时队列
						delayQueue.put(new DelayedMsg(msg, new Date(System.currentTimeMillis() + retryDelaySeconds * 1000L), 0));
					}
				});
			}else {
				SleepUtil.sleepMs(delayMs);
				continue;
			}
			SleepUtil.sleepMs(sleepMs);
		}
	}

	public void consumerRetryDelayMsg() {
		for(;;){
			if(!isStarted){
				SleepUtil.sleepRandomMs(10);
				continue;
			}
			DelayedMsg delayedMsg =null;
			try {
				delayedMsg = delayQueue.take();
			}catch (InterruptedException e) {
				//ignore
			}
			if(Objects.nonNull(delayedMsg)){
				var msg = delayedMsg.getMsg();
				var retryCount = delayedMsg.getRetryCount();
				consumerExecutor.execute(() -> {
					try {
						listener.consume(msg);
					} catch (Exception e) {
						log.warning("redisPopMq consume delayed msg error,msg:{}" +  msg + e.getMessage());
						//将msg缓存到延时队列
						if(retryCount< maxRetryTime) {
							delayQueue.put(new DelayedMsg(msg, new Date(System.currentTimeMillis() + retryDelaySeconds * 1000L), retryCount + 1));
						}else {
							log.warning("redisPopMq consume delayed msg error > maxRetry time,msg:{}" + msg + e.getMessage());
						}
					}
				});
			}
		}
	}

	@Override
	public void shutdown() {
		if(!isStarted)
			return;
		isStarted = false;
		try {
			consumerExecutor.awaitTermination(3, TimeUnit.SECONDS);
			log.info("redisPopMq consumer shutdown");
		}catch (InterruptedException e){
			//ignore
		}
	}

	@Override
	public void setProperties(final Properties properties) {
		var redisListKey = properties.getProperty("redisListKey");
		if(Objects.isNull(redisListKey) || redisListKey.isBlank())
			throw new IllegalArgumentException("redisListKey is null");
		this.redisListKey = redisListKey;
		var threadCountStr = properties.getProperty("threadCount");
		this.threadCount = parseNum(threadCountStr,4);
		//重试时间
		var retryDelaySecondsStr = properties.getProperty("reTryDelaySeconds");
		this.retryDelaySeconds = parseNum(retryDelaySecondsStr,10);
		//最大重试次数
		var retryTimeStr = properties.getProperty("retryTime");
		this.maxRetryTime = parseNum(retryTimeStr,3);
		var statisticsIntervalSecondsStr = properties.getProperty("statisticsIntervalSeconds");
		this.statisticsIntervalSeconds = parseNum(statisticsIntervalSecondsStr,statisticsIntervalSeconds);

	}

	@Override
	public void setListener(final RedisPopMqListener listener) {
		this.listener = listener;
	}

	@Override
	public void setRedisTemplate(final StringRedisTemplate redisTemplate) {
		this.redisTemplate = redisTemplate;
	}

	private int parseNum(String numStr, int defaultNum){
		if(Objects.isNull(numStr) || numStr.isBlank())
			return defaultNum;
		try{
			return Integer.parseInt(numStr);
		}catch (Exception e){
			return defaultNum;
		}
	}
}